package flinkstudy.stream.async;

import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * @author daocr
 * @date 2020/2/6
 */
public class AsyncEsData {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;

    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }


    /**
     * 异步读取数据
     */
    @Test
    public void asyncV1() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        DataStreamSource<Tuple2<Integer, String>> source1 = env.addSource(new SourceFunction<Tuple2<Integer, String>>() {
            @Override
            public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
                while (true) {

                    ThreadLocalRandom current = ThreadLocalRandom.current();

                    int i = current.nextInt(1, 4);

                    ctx.collect(Tuple2.of(i, UUID.randomUUID().toString()));

                    Thread.sleep(100);
                }
            }

            @Override
            public void cancel() {

            }
        });

        SingleOutputStreamOperator<Tuple3<Integer, String, String>> tuple3SingleOutputStreamOperator = AsyncDataStream.unorderedWait(source1, new AsyncEsDataRequest(), 10, TimeUnit.SECONDS);

        tuple3SingleOutputStreamOperator.print();

        env.execute();

    }

    public static class AsyncEsDataRequest extends RichAsyncFunction<Tuple2<Integer, String>, Tuple3<Integer, String, String>> {

        @Override
        public void asyncInvoke(Tuple2<Integer, String> input, ResultFuture<Tuple3<Integer, String, String>> resultFuture) throws Exception {

            HashMap<Integer, String> map = new HashMap<>();

            map.put(1, "湖南");
            map.put(2, "湖北");
            map.put(3, "上海");

            ThreadLocalRandom current = ThreadLocalRandom.current();

            Thread.sleep(current.nextLong(1000, 3000));

            resultFuture.complete(Arrays.asList(Tuple3.of(input.f0, input.f1, map.get(input.f0))));

        }
    }


}
